package io.sundial.discovery.impl;

import io.loadkit.Loaders;
import io.loadkit.Resource;
import io.sundial.assembly.Assembler;
import io.sundial.core.AtomicRNCommand;
import io.sundial.core.AtomicRTCommand;
import io.sundial.core.AtomicVTCommand;
import io.sundial.core.context.Context;
import io.sundial.core.lifecycle.Lifecycle;
import io.sundial.core.lifecycle.exception.DestroyingException;
import io.sundial.core.lifecycle.exception.InitializingException;
import io.sundial.discovery.Discoverer;
import io.sundial.discovery.event.ChangedEvent;
import io.sundial.discovery.exception.DiscoveringException;
import io.sundial.job.Job;
import io.sundial.util.MultipleEnumeration;
import io.sundial.util.ObjKit;
import io.sundial.util.ResKit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.*;
import java.net.URI;
import java.net.URL;
import java.net.URLClassLoader;
import java.nio.file.FileSystem;
import java.nio.file.*;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;

/**
 * 外部加载方式的作业发现者
 *
 * @author Payne 646742615@qq.com
 * 2018/12/28 10:07
 */
public class ExternalLoadingDiscoverer extends EventSupportingDiscoverer implements Discoverer {
    private final Logger logger = LoggerFactory.getLogger(this.getClass());

    private volatile URLClassLoader libClassLoader;
    private final Map<String, Discoverer> discoverers = new ConcurrentHashMap<>();
    private final Map<String, Assembler> assemblers = new HashMap<>();

    private String fileSystemURI = "file:///";
    private Map<String, Object> fileSystemENV = new HashMap<>();
    private ClassLoader fileSystemLoader = ObjKit.getClassLoader(this.getClass().getClassLoader());
    private String sharedLibFolder = "shared";
    private SharedLibWatcher sharedLibWatcher;
    private String sharedTmpFolder = "temp";
    private Set<String> extendJobFolders = new HashSet<>(Collections.singleton("job"));
    private ExtendJobWatcher extendJobWatcher;

    private long reloadMinInterval = 2L * 1000L;

    @Override
    protected void initializing(Context context) throws InitializingException {
        super.initializing(context);

        try {
            {
                Map<String, Assembler> map = context.fetch(Assembler.class);
                for (Assembler assembler : map.values()) {
                    assemblers.put(assembler.suffix(), assembler);
                    if (assembler instanceof Lifecycle) {
                        ((Lifecycle) assembler).initialize(context);
                    }
                }
            }

            URI uri = new URI(fileSystemURI);
            FileSystem fileSystem;
            try {
                fileSystem = FileSystems.getFileSystem(uri);
            } catch (FileSystemNotFoundException e) {
                fileSystem = FileSystems.newFileSystem(uri, fileSystemENV, fileSystemLoader);
            }

            {
                String folder = ResKit.absolutize(sharedLibFolder);
                File directory = ResKit.mkdirs(folder);
                Path path = fileSystem.getPath(folder);
                WatchService watcher = fileSystem.newWatchService();
                path.register(watcher, StandardWatchEventKinds.ENTRY_CREATE, StandardWatchEventKinds.ENTRY_MODIFY, StandardWatchEventKinds.ENTRY_DELETE);

                onSharedLibChanged(directory);

                sharedLibWatcher = new SharedLibWatcher(directory, watcher);
                sharedLibWatcher.start();
            }

            for (String folder : extendJobFolders) {
                folder = ResKit.absolutize(folder);
                File directory = ResKit.mkdirs(folder);
                Path path = fileSystem.getPath(folder);
                WatchService watcher = fileSystem.newWatchService();
                path.register(watcher, StandardWatchEventKinds.ENTRY_CREATE, StandardWatchEventKinds.ENTRY_MODIFY, StandardWatchEventKinds.ENTRY_DELETE);

                extendJobWatcher = new ExtendJobWatcher(directory, watcher);
                extendJobWatcher.start();
            }
        } catch (Exception e) {
            throw new InitializingException(e);
        }
    }

    @Override
    protected void destroying() throws DestroyingException {
        super.destroying();

        ResKit.close(sharedLibWatcher);
        ResKit.close(extendJobWatcher);
        ResKit.close(libClassLoader);

        for (Assembler assembler : assemblers.values()) {
            if (assembler instanceof Lifecycle) {
                ((Lifecycle) assembler).destroy();
            }
        }
        assemblers.clear();

        for (Discoverer discoverer : discoverers.values()) {
            discoverer.destroy();
        }
        discoverers.clear();
    }

    private void onSharedLibChanged(File directory) throws Exception {
        File temporary = new File(ResKit.absolutize(sharedTmpFolder), UUID.randomUUID().toString());

        List<URL> urls = new ArrayList<>();
        Enumeration<Resource> libs = Loaders.ant(Loaders.file(directory)).load("*.jar");
        while (libs.hasMoreElements()) {
            Resource resource = libs.nextElement();
            InputStream in = null;
            OutputStream out = null;
            try {
                File lib = new File(temporary, resource.getName());
                ResKit.mkdirs(lib.getParentFile());
                in = resource.getInputStream();
                out = new FileOutputStream(lib);
                ResKit.transfer(in, out);
                urls.add(lib.toURI().toURL());
            } finally {
                ResKit.close(in);
                ResKit.close(out);
            }
        }
        ResKit.close((Closeable) libClassLoader);
        libClassLoader = URLClassLoader.newInstance(urls.toArray(new URL[0]), fileSystemLoader);

        for (String job : extendJobFolders) {
            job = ResKit.absolutize(job);
            File folder = new File(job);
            File[] files = folder.listFiles();
            for (int i = 0; files != null && i < files.length; i++) {
                File file = files[i];
                if (!file.isFile()) {
                    continue;
                }
                onExtendJobChanged(file);
            }
        }
    }

    private void onExtendJobChanged(File file) throws Exception {
        File temporary = new File(ResKit.absolutize(sharedTmpFolder));

        File source = file;
        File target = null;
        while (true) {
            String name = source.getName();
            int index = name.lastIndexOf('.');
            if (index <= 0) {
                break;
            }
            String suffix = name.substring(index + 1);
            Assembler assembler = assemblers.get(suffix);
            if (assembler == null) {
                break;
            }
            target = new File(temporary, UUID.randomUUID() + File.separator + name.substring(0, index));
            ResKit.mkdirs(target.getParentFile());
            assembler.disassemble(source, target);
            source = target;
            if (source.isDirectory()) {
                break;
            }
        }
        if (target == null || !target.isDirectory()) {
            logger.warn("could not disassemble file: {}", file);
            return;
        }

        List<URL> urls = new ArrayList<>();
        Enumeration<Resource> libs = Loaders.ant(Loaders.file(target)).load("**.jar");
        while (libs.hasMoreElements()) urls.add(libs.nextElement().getUrl());

        URLClassLoader jobClassLoader = URLClassLoader.newInstance(urls.toArray(new URL[0]), libClassLoader);
        Discoverer discoverer = new URLServiceLoadingDiscoverer(jobClassLoader);
        discoverer.initialize(context);

        discoverers.put(file.getAbsolutePath(), discoverer);
    }

    private abstract class EventWatcher extends Thread implements Thread.UncaughtExceptionHandler, Closeable {
        final File directory;
        final WatchService watcher;

        EventWatcher(File directory, WatchService watcher) {
            this.directory = directory;
            this.watcher = watcher;
            this.setDaemon(true);
            this.setUncaughtExceptionHandler(this);
        }

        @Override
        public void run() {
            while (state() != Lifecycle.State.DESTROYED) {
                WatchKey watchKey;
                try {
                    watchKey = watcher.poll(reloadMinInterval, TimeUnit.MILLISECONDS);
                } catch (InterruptedException e) {
                    logger.warn(getName() + " was interrupted");
                    break;
                }
                try {
                    if (watchKey == null) {
                        onInterval();
                    } else {
                        onWatched(watchKey.pollEvents());
                    }
                } catch (Throwable e) {
                    logger.warn(String.format("error occurred while handling directory [%s] change events", directory), e);
                } finally {
                    if (watchKey != null) {
                        watchKey.reset();
                    }
                }
            }
            ResKit.close(watcher);
        }

        abstract void onInterval() throws Exception;

        abstract void onWatched(List<WatchEvent<?>> events) throws Exception;

        @Override
        public void uncaughtException(Thread t, Throwable e) {
            ResKit.close(watcher);
            logger.warn(String.format("error occurred while directory [%s] change events", directory), e);
        }

        @Override
        public void close() throws IOException {
            this.interrupt();
            watcher.close();
        }
    }

    private class SharedLibWatcher extends EventWatcher {
        private volatile boolean changed;

        SharedLibWatcher(File directory, WatchService watcher) {
            super(directory, watcher);
            this.setName("shared-lib-watcher[" + directory + "]");
        }

        @Override
        void onInterval() throws Exception {
            if (changed) {
                changed = false;
                doWritingCommand(new AtomicVTCommand<Exception>() {
                    @Override
                    public void run() throws Exception {
                        onSharedLibChanged(directory);
                    }
                });
                fire(new ChangedEvent());
            }
        }

        @Override
        void onWatched(List<WatchEvent<?>> events) {
            for (WatchEvent<?> event : events) {
                if (event.context().toString().endsWith(".jar")) {
                    changed = true;
                }
            }
        }
    }

    private class ExtendJobWatcher extends EventWatcher {
        private final Set<File> archives = new HashSet<>();

        ExtendJobWatcher(File directory, WatchService watcher) {
            super(directory, watcher);
            this.setName("extend-job-watcher[" + directory + "]");
        }

        @Override
        void onInterval() throws Exception {
            if (!archives.isEmpty()) {
                doWritingCommand(new AtomicVTCommand<Exception>() {
                    @Override
                    public void run() throws Exception {
                        Iterator<File> iterator = archives.iterator();
                        while (iterator.hasNext()) {
                            File archive = iterator.next();
                            iterator.remove();
                            Discoverer discoverer = discoverers.remove(archive.getAbsolutePath());
                            if (discoverer != null) {
                                try {
                                    discoverer.destroy();
                                } catch (Exception e) {
                                    logger.error("error occurred while destroying discoverer", e);
                                }
                            }
                            if (archive.exists() && archive.isFile()) {
                                onExtendJobChanged(archive);
                            }
                        }
                    }
                });
                fire(new ChangedEvent());
            }
        }

        @Override
        void onWatched(List<WatchEvent<?>> events) {
            for (WatchEvent<?> event : events) {
                archives.add(new File(directory, event.context().toString()));
            }
        }
    }

    private static class URLServiceLoadingDiscoverer extends ServiceLoadingDiscoverer {
        private final URLClassLoader jobClassLoader;

        URLServiceLoadingDiscoverer(URLClassLoader jobClassLoader) {
            super(jobClassLoader);
            this.jobClassLoader = jobClassLoader;
        }

        @Override
        protected void destroying() throws DestroyingException {
            ResKit.close(jobClassLoader);
            super.destroying();
        }
    }

    @Override
    public Job discover(final String jobName, final String jobGroup) throws DiscoveringException {
        return doReadingCommand(new AtomicRTCommand<Job, DiscoveringException>() {
            @Override
            public Job run() throws DiscoveringException {
                for (Discoverer discoverer : discoverers.values()) {
                    try {
                        return discoverer.discover(jobName, jobGroup);
                    } catch (DiscoveringException ignored) {
                        /* ignored */
                    }
                }
                throw new DiscoveringException("jobGroup: " + jobGroup + ", jobName: " + jobName);
            }
        });
    }

    @Override
    public Enumeration<Job> jobs() {
        return doReadingCommand(new AtomicRNCommand<Enumeration<Job>>() {
            @Override
            public Enumeration<Job> run() {
                List<Enumeration<Job>> enumerations = new ArrayList<>();
                for (Discoverer discoverer : discoverers.values()) {
                    enumerations.add(discoverer.jobs());
                }
                return new MultipleEnumeration<>(enumerations);
            }
        });
    }

    public String getFileSystemURI() {
        return fileSystemURI;
    }

    public void setFileSystemURI(String fileSystemURI) {
        this.fileSystemURI = fileSystemURI;
    }

    public Map<String, Object> getFileSystemENV() {
        return fileSystemENV;
    }

    public void setFileSystemENV(Map<String, Object> fileSystemENV) {
        this.fileSystemENV = fileSystemENV;
    }

    public ClassLoader getFileSystemLoader() {
        return fileSystemLoader;
    }

    public void setFileSystemLoader(ClassLoader fileSystemLoader) {
        this.fileSystemLoader = fileSystemLoader;
    }

    public String getSharedLibFolder() {
        return sharedLibFolder;
    }

    public void setSharedLibFolder(String sharedLibFolder) {
        this.sharedLibFolder = sharedLibFolder;
    }

    public String getSharedTmpFolder() {
        return sharedTmpFolder;
    }

    public void setSharedTmpFolder(String sharedTmpFolder) {
        this.sharedTmpFolder = sharedTmpFolder;
    }

    public Set<String> getExtendJobFolders() {
        return extendJobFolders;
    }

    public void setExtendJobFolders(Set<String> extendJobFolders) {
        this.extendJobFolders = extendJobFolders;
    }

    public long getReloadMinInterval() {
        return reloadMinInterval;
    }

    public void setReloadMinInterval(long reloadMinInterval) {
        this.reloadMinInterval = reloadMinInterval;
    }
}
